Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a basic generic API client #313

Merged
merged 124 commits into from
Mar 22, 2024
Merged

Implement a basic generic API client #313

merged 124 commits into from
Mar 22, 2024

Conversation

burnash
Copy link
Collaborator

@burnash burnash commented Dec 29, 2023

Tell us what you do here

This PR implements a basic generic API client. Mostly by trying to generilize pagination and auth logic which many of the sources in this repo reimplement. The goal is to speed up sources development by unifying some common functionality. At the same time I'm trying to design the client's API to be as close to Requests as possible.

Prior relevant work: dlt-hub/dlt-init-openapi-demo#2

Example: Get issues GitHub resource from dlt docs.

Original

import dlt
from dlt.sources.helpers import requests

@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    # NOTE: we read only open issues to minimize number of calls to the API. There's a limit of ~50 calls for not authenticated Github users
    url = f"https://api.github.com/repos/dlt-hub/dlt/issues?since={updated_at.last_value}&per_page=100&sort=updated&directions=desc&state=open"

    while True:
        response = requests.get(url)
        response.raise_for_status()
        yield response.json()

        # get next page
        if "next" not in response.links:
            break
        url = response.links["next"]["url"]

pipeline = dlt.pipeline(
    pipeline_name="github_issues_merge",
    destination="duckdb",
    dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
print(load_info)

Using RESTClient

import dlt
from rest_api import RESTClient

github_client = RESTClient(base_url="https://api.github.com")


@dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
)
def get_issues(
    updated_at=dlt.sources.incremental(
        "updated_at", initial_value="1970-01-01T00:00:00Z"
    )
):
    for page in github_client.paginate(
        "/repos/dlt-hub/dlt/issues",
        params={
            "since": updated_at.last_value,
            "per_page": 100,
            "sort": "updated",
            "direction": "desc",
            "state": "open",
        },
    ):
        yield page


# The rest of the pipeline remains the same
pipeline = dlt.pipeline(
    pipeline_name="github_issues_merge",
    destination="duckdb",
    dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
print(load_info)

@burnash burnash marked this pull request as draft December 29, 2023 15:07
@burnash burnash requested a review from rudolfix December 29, 2023 15:08
@rudolfix
Copy link
Contributor

@burnash

This is very helpful. At least from my side it makes way clearer how to design code here. The idea of request-like helper that has pagination, auth and other things built-in is good.

api
basically I see two components

  1. paginated requests which has similar interface to requests but instead of returning Response it retruns a generator, yielding pages of data

on top of usual requests arguments it also accepts

  • authenticator instance (we already have auth argument)
  • paginator instance
  • response unwrapper (extracts page of data from response)
  • incremental instance if endpoint is incremental
  • incremental param to connect incremental instance to

the returned generator may be passed directly to dlt.resource (I think it will be generator function, not generator so you can bind incremental to it)

ie.

paginated = paginated_request(url, pagination=json_paginator(), auth=bearer(api_secret), incremental=dlt.source.incremental("updated_at"), incremental_param="sice")

resource = dlt.resource(paginated.data, name="...", primary_key="...")

where paginated.data is a generator function ie.

def data(self, last_value: dlt.source.incremental[Any] = None):
	... # main loop happens here

I'd strive move as much logic into a common code and make paginators and authenticators as simple as possible. For example paginator could get a last response and a prepared request and modify it to get next page. we could even make it more abstract and pass header, paramas etc.

  1. declarative part. it is similar to Alex to EndpointFactory but also adds definitions of the endpoints. why we need it:
  • to declare most common behavior. we hope 90% of apis can be just declared using starndard auth and paginators
  • to easily add new endpoints
  • to further ease using of incremental

To declare an endpoint we basically provide kwargs to the paginated_request. the declaration can also take default authenticator and paginator

then in the dlt.source user can enumerate all paginated_requests and create dlt.resources out of them. there should be also a method to get a dynamic endpoint out of it (then it works like Alex's factory)

Approach
Currently you try to fit a tool into existing code. So you get those complicated conditional paginators serving many different endpoint types. All try something different: let's build a decent API and then fit our verified sources. starting with simple stuff like pokemon and then going for zendesk..

sources/api_client.py Outdated Show resolved Hide resolved
Copy link
Contributor

@sultaniman sultaniman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks cool, I also wonder if pagination strategy should also take params from .dlt/config.yml?

sources/api_client.py Outdated Show resolved Hide resolved
sources/api_client.py Outdated Show resolved Hide resolved
sources/api_client.py Outdated Show resolved Hide resolved
sources/api_client.py Outdated Show resolved Hide resolved
sources/api_client.py Outdated Show resolved Hide resolved
sources/api_client.py Outdated Show resolved Hide resolved
def get(self, path="", params=None):
return self.make_request(path, method="get", params=params)

def post(self, path="", json=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can also use typing override to define signature variances?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sultaniman could you elaborate on this one?

sources/api_client.py Outdated Show resolved Hide resolved
sources/zendesk/helpers/paginators.py Outdated Show resolved Hide resolved
sources/rest_api/client.py Outdated Show resolved Hide resolved
sources/rest_api/detector.py Outdated Show resolved Hide resolved
sources/rest_api/detector.py Outdated Show resolved Hide resolved
@@ -3,7 +3,7 @@
from sources.notion.helpers.database import NotionDatabase
from sources.notion.helpers.client import NotionClient


@pytest.mark.skip
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping this since it's testing the former client

@@ -21,7 +21,7 @@ def test_all_resources(destination_name: str) -> None:
table_counts = load_table_counts(pipeline, *table_names)

assert table_counts["employees"] >= 31
assert table_counts["absence_types"] >= 6
assert table_counts["absence_types"] >= 5
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusting for a fresh Personio account.

self.base_url = base_url
self.headers = headers
self.auth = auth
self.paginator = paginator if paginator else HeaderLinkPaginator()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what should be a default here, so I went with a personal preference.

sources/api_client.py Outdated Show resolved Hide resolved
sources/api_client.py Outdated Show resolved Hide resolved
sources/rest_api/client.py Show resolved Hide resolved
sources/rest_api/__init__.py Outdated Show resolved Hide resolved
sources/rest_api/__init__.py Outdated Show resolved Hide resolved
)

PaginatorConfigDict = Dict[str, Any]
PaginatorType = Union[Any, BasePaginator, str, PaginatorConfigDict]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ba46fda by dlt-hub/dlt#1082
Thanks again @sultaniman for the review!

sources/rest_api/client.py Show resolved Hide resolved
@burnash burnash requested a review from rudolfix March 14, 2024 10:39
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO (for later)

  1. "Endpoint" does not make it clear if single entity is returned or a list of entities. should we have "none" paginator to indicate that? or some boolean flag

  2. What is the use of SinglePage paginator now? we have this add_limit which makes any paginator single page

  3. I should be able to bind same incremental to start/since param (start_value) and to end/before param (end_value).

  4. I should be able to bind same transformer to many params when id is compound (this is less important)

  5. paginators should be configspecs (dataclasses) so they can be configured and initialized like authentiators. we could use the same factory approach for both - I can do this one

  6. RESTAPIConfig should accept an instance of DltResource here to allow completely code to be mixed with declared endpoint resources

resources: List[Union[str, EndpointResource]]

(this is less important)
7. several types in typing.py must go to dlt core ASAP. I can also handle that. we could also generate typed dicts automatically for all our configspecs.

sources/rest_api/__init__.py Show resolved Hide resolved
param_name=param_name,
field_path=resolved_param.resolve_config.field_path,
):
items = items or []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should allow for items that are not lists and encapsulate them. also "items" that evaluate to False - that rather should not happen

sources/rest_api/client.py Show resolved Hide resolved
sources/rest_api/__init__.py Outdated Show resolved Hide resolved
sources/rest_api/config_setup.py Show resolved Hide resolved
sources/rest_api/config_setup.py Show resolved Hide resolved
sources/rest_api/detector.py Show resolved Hide resolved
sources/rest_api/paginators.py Outdated Show resolved Hide resolved
sources/rest_api/paginators.py Show resolved Hide resolved
sources/rest_api/paginators.py Outdated Show resolved Hide resolved
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

sources/rest_api/config_setup.py Show resolved Hide resolved
@burnash burnash merged commit 079a59f into master Mar 22, 2024
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants